package com.it.windows;

import com.it.operator.utils.SourceUtils;
import com.it.pojo.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.HashSet;

/**
 * 计算pv/uv
 *
 * @author code1997
 */
public class WindowRvUvTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<Event> eventSource = SourceUtils.getEventSource(executionEnvironment)
                //调用flink内置的针对于有序流的watermark策略
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));
        eventSource.print();
        eventSource.keyBy(event -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10L),Time.seconds(2L)))
                .aggregate(new PvUvAggregateFunctionClass()).print();
        executionEnvironment.execute();
    }

    static class PvUvAggregateFunctionClass implements AggregateFunction<Event, Tuple2<Long, HashSet<String>>, Double> {


        @Override
        public Tuple2<Long, HashSet<String>> createAccumulator() {
            return Tuple2.of(0L, new HashSet<>());
        }

        @Override
        public Tuple2<Long, HashSet<String>> add(Event value, Tuple2<Long, HashSet<String>> accumulator) {
            accumulator.f1.add(value.user);
            return Tuple2.of(accumulator.f0 + 1, accumulator.f1);
        }

        @Override
        public Double getResult(Tuple2<Long, HashSet<String>> accumulator) {
            return Double.valueOf(accumulator.f0) / accumulator.f1.size();
        }

        @Override
        public Tuple2<Long, HashSet<String>> merge(Tuple2<Long, HashSet<String>> a, Tuple2<Long, HashSet<String>> b) {
            a.f1.addAll(b.f1);
            return Tuple2.of(a.f0 + b.f0, a.f1);
        }
    }

}
